package com.shujia.flink.core

import org.apache.flink.streaming.api.scala._

object Demo3tParallelism {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    /**
     * 设置flink任务的并行度
     * 1、在代码中同意设置并行度
     * 2、可以在体检任务的时候设置并行度   代码中设置并行度的优先级高于提交热任务是设置的并行度
     * 2、每一个算子可以单独设置并行度，keyBy除外  -- 优先级时最高的
     *
     *
     * flink 任务需要申请的资源和并行度有关，和task的数量没有关系
     *
     */
    //env.setParallelism(2)

    //从socket中读取数据的并行度只能是1
    val linesDS: DataStream[String] = env
      .socketTextStream("master", 8888)
      .setParallelism(1)
      .name("读取socket中的数据") //每一个节点可以设置一个名字
      .uid("1") //为一个标识
      .shuffle //将前面拆分成两部分，并行度一样时也会拆分


    val wordsDS: DataStream[String] = linesDS
      .flatMap(_.split(","))
      .setParallelism(2)
      .name("将一行数据转换成多行")
      .uid("2")

    val kvDS: DataStream[(String, Int)] = wordsDS
      .map((_, 1))
      .setParallelism(3)
      .name("转换成kv格式")
      .uid("3")

    val keyByDs: KeyedStream[(String, Int), String] = kvDS
      .keyBy(_._1)


    val countDS: DataStream[(String, Int)] = keyByDs
      .sum(1)
      .setParallelism(4)
      .name("分组聚合")
      .uid("4")

    countDS.print()
      .setParallelism(1)
      .name("打印结果")
      .uid("5")

    env.execute()
  }

}
